//===----------------------------------------------------------------------===//
//
//                         BusTub
//
// delete_executor.cpp
//
// Identification: src/execution/delete_executor.cpp
//
// Copyright (c) 2015-2021, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//

#include <memory>

#include "execution/executors/delete_executor.h"

namespace bustub {

DeleteExecutor::DeleteExecutor(ExecutorContext *exec_ctx, const DeletePlanNode *plan,
                               std::unique_ptr<AbstractExecutor> &&child_executor)
    : AbstractExecutor(exec_ctx),
      plan_(plan),
      child_executor_(std::move(child_executor)),
      table_info_(exec_ctx_->GetCatalog()->GetTable(plan_->TableOid())) {}

void DeleteExecutor::Init() {
  child_executor_->Init();
  auto oid = plan_->TableOid();
  auto txn = exec_ctx_->GetTransaction();
  auto lock_mgr = exec_ctx_->GetLockManager();
  lock_mgr->LockTable(txn, LockManager::LockMode::INTENTION_EXCLUSIVE, oid);
}

void DeleteExecutor::NextResult(Tuple *tuple, RID *rid) {
  Value value = {INTEGER, rows_};
  Column col = {"foo", INTEGER};
  auto dummy_schema = Schema{{col}};
  *tuple = Tuple{{value}, &GetOutputSchema()};
}

auto DeleteExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) -> bool {
  LOG_DEBUG("Next0");
  while (child_executor_->Next(tuple, rid)) {
    // 删除,标记一下tuple元信息即可
    auto tuple_meta = table_info_->table_->GetTupleMeta(*rid);
    tuple_meta.is_deleted_ = true;
    auto oid = plan_->TableOid();
    auto txn = exec_ctx_->GetTransaction();
    auto lock_mgr = exec_ctx_->GetLockManager();
    try {
      bool res = lock_mgr->LockRow(txn, LockManager::LockMode::EXCLUSIVE, oid, *rid);
      if (!res) {
        throw ExecutionException("delete");
      }
    } catch (TransactionAbortException) {
      throw ExecutionException("delete");
    }
    table_info_->table_->UpdateTupleMeta(tuple_meta, *rid);

    // 更新写集
    TableWriteRecord w_record{oid, *rid, table_info_->table_.get()};
    w_record.wtype_ = WType::DELETE;
    txn->AppendTableWriteRecord(w_record);

    // 删除索引条目
    auto transaction = exec_ctx_->GetTransaction();
    auto indexs = exec_ctx_->GetCatalog()->GetTableIndexes(table_info_->name_);
    for (auto &&index_info : indexs) {
      LOG_DEBUG("index=%s, tuple:%s", index_info->name_.c_str(),
                tuple->ToString(&child_executor_->GetOutputSchema()).c_str());
      index_info->index_->DeleteEntry(tuple->KeyFromTuple(child_executor_->GetOutputSchema(), index_info->key_schema_,
                                                          index_info->index_->GetKeyAttrs()),
                                      *rid, transaction);
      // 更新写集
      IndexWriteRecord idx_w_record{*rid, oid, WType::DELETE, *tuple, index_info->index_oid_, exec_ctx_->GetCatalog()};
      txn->AppendIndexWriteRecord(idx_w_record);
    }
    rows_++;
  }
  NextResult(tuple, rid);
  LOG_DEBUG("Next5");
  bool ret = false;
  ret = !return_trued_;
  return_trued_ = true;
  return ret;
}

}  // namespace bustub
